/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.jena.dboe.transaction;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.jena.atlas.io.IO;
import org.apache.jena.atlas.lib.FileOps;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.txn.ComponentId;
import org.apache.jena.dboe.transaction.txn.TransactionalComponentLifecycle;
import org.apache.jena.dboe.transaction.txn.TxnId;
import org.apache.jena.query.ReadWrite;

/** A transaction component with an integer supporting MR+SW (=one writer AND many readers).
 */
public class TransInteger extends TransactionalComponentLifecycle<TransInteger.IntegerState> {

    private final AtomicLong value = new AtomicLong(-1712);
    private final String filename;

    /** Per transaction state - and per thread safe because we subclass
     * TransactionalComponentLifecycle
     */
    static class IntegerState {
        long txnValue;
        public IntegerState(long v) { this.txnValue = v; }
    }

    static int counter = 0;

    /** In-memory, non persistent, transactional integer */
    public TransInteger() { this(0L); }

    /** In-memory, non persistent, transactional integer */
    public TransInteger(long v) {
        this(v, ComponentId.allocLocal());
    }

    /** In-memory, non persistent, transactional integer */
    public TransInteger(long v, ComponentId componentId) {
        this(null, componentId);
        value.set(v);
    }

    /** Persistent, transactional integer. The persistent state is held in
     *  filename.  When first initialized, the value is 0L.
     * @param filename  Persistent state
     * @param cid        Component id
     */
    public TransInteger(String filename, ComponentId cid) {
        super(cid);
        this.filename = filename;
        // Set the value now for "fast read" transactions.
        readLocation();
    }

    private void readLocation() {
        if ( filename != null ) {
            if ( ! FileOps.exists(filename) ) {
                value.set(0L);
                writeLocation();
                return;
            }
            long x = read(filename);
            value.set(x);
        }
    }

    private void writeLocation() {
        writeLocation(value.get());
    }

    private void writeLocation(long value) {
        if ( filename != null ) {
            write(filename, value);
        }
    }

    //-- Read/write the value
    // This should really be checksum'ed or other internal check to make sure IO worked.
    private static long read(String filename) {
        try {
            String str = IO.readWholeFileAsUTF8(filename);
            if ( str.endsWith("\n") ) {
                str = str.substring(0, str.length()-1);
            }
            str = str.trim();
            return Long.parseLong(str);
        }
        catch (IOException ex) {
            Log.error(TransInteger.class, "IOException: " + ex.getMessage(), ex);
            IO.exception(ex);
        }
        catch (NumberFormatException ex) {
            Log.error(TransInteger.class, "NumberformatException: " + ex.getMessage());
            throw new InternalErrorException(ex);
        }
        // Not reached.
        return Long.MIN_VALUE;
    }

    private static void write(String filename, long value) {
        try { IO.writeStringAsUTF8(filename, Long.toString(value)); }
        catch (IOException ex) {}
        catch (NumberFormatException ex) {}
    }

    private boolean recoveryAction = false;

    @Override
    public void startRecovery() {
        recoveryAction = false;
    }

    @Override
    public void recover(ByteBuffer ref) {
        long x = ref.getLong();
        value.set(x);
        recoveryAction = true;
    }

    @Override
    public void finishRecovery() {
        if ( recoveryAction )
            writeLocation();
        // Leave true as a record.
    }

    @Override
    public void cleanStart() {
        recoveryAction = false;
    }

    /** Set the value, return the old value*/
    public void inc() {
        requireWriteTxn();
        IntegerState ts = getDataState();
        ts.txnValue++;
    }

    /** Set the value, return the old value*/
    public long set(long x) {
        requireWriteTxn();
        IntegerState ts = getDataState();
        long v = ts.txnValue;
        ts.txnValue = x;
        return v;
    }

    /** Return the current value in a transaction. s*/
    public long read() {
        checkTxn();
        return getDataState().txnValue;
    }

    /** Return the current value.
     * If inside a transaction, return the transaction view of the value.
     * If not in a transaction return the state value (effectively
     * a read transaction, optimized by the fact that reading the
     * {@code TransInteger} state is atomic).
     */
    public long get() {
        if ( super.isActiveTxn() )
            return getDataState().txnValue;
        else
            return value.get();
    }

    /** Read the current global state (that is, the last committed value) outside a transaction. */
    public long value() {
        return value.get();
    }

    @Override
    protected IntegerState _begin(ReadWrite readWrite, TxnId txnId) {
        return createState();
    }

    private IntegerState createState() {
        return new IntegerState(value.get());
    }

    @Override
    protected IntegerState _promote(TxnId txnId, IntegerState state) {
        return createState();
    }

    @Override
    protected ByteBuffer _commitPrepare(TxnId txnId, IntegerState state) {
        if ( isReadTxn() )
            return null;
        ByteBuffer x = ByteBuffer.allocate(Long.BYTES);
        x.putLong(state.txnValue);
        return x;
    }

    @Override
    protected void _commit(TxnId txnId, IntegerState state) {
        if ( isReadTxn() )
            return;
        writeLocation(state.txnValue);
    }

    @Override
    protected void _commitEnd(TxnId txnId, IntegerState state) {
        if ( isReadTxn() )
            return;
        value.set(state.txnValue);
    }
    @Override
    protected void _abort(TxnId txnId, IntegerState state) {
        // Nothing
    }

    @Override
    protected void _complete(TxnId txnId, IntegerState state) {
        // Nothing
    }

    @Override
    protected void _shutdown() {
    }

    @Override
    public String toString() {
        return String.valueOf(super.getComponentId());
    }
}

